/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.rpc.remote.grpc;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.iec.edp.caf.boot.context.CAFBizContextHolder;
import io.iec.edp.caf.commons.core.SerializerFactory;
import io.iec.edp.caf.commons.core.enums.SerializeType;
import io.iec.edp.caf.commons.exception.CAFRuntimeException;
import io.iec.edp.caf.commons.exception.ExceptionLevel;
import io.iec.edp.caf.commons.exception.entity.DefaultExceptionProperties;
import io.iec.edp.caf.commons.exception.entity.ExceptionErrorCode;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import io.iec.edp.caf.commons.utils.StringUtils;
import io.iec.edp.caf.core.context.ICAFContextService;
import io.iec.edp.caf.rpc.api.common.GspSerializeType;
import io.iec.edp.caf.rpc.api.event.RpcServerEventBroker;
import io.iec.edp.caf.rpc.api.grpc.GrpcInvokeServiceGrpc;
import io.iec.edp.caf.rpc.api.grpc.GrpcRequest;
import io.iec.edp.caf.rpc.api.grpc.GrpcResponse;
import io.iec.edp.caf.rpc.api.serialize.RpcSerializeUtil;
import io.iec.edp.caf.rpc.api.service.InternalServiceManageService;
import io.iec.edp.caf.rpc.api.service.RpcServer;
import io.iec.edp.caf.rpc.api.support.ConstanceVarible;
import io.iec.edp.caf.rpc.api.support.RpcThreadCacheHolder;
import io.iec.edp.caf.tenancy.api.context.RequestTenantContextHolder;
import io.iec.edp.caf.tenancy.api.context.RequestTenantContextInfo;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.var;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

@Slf4j
public class GrpcServerInvoke extends GrpcInvokeServiceGrpc.GrpcInvokeServiceImplBase {

    private ICAFContextService cafContextService = SpringBeanUtils.getBean(ICAFContextService.class);
    private RpcServer rpcServer = SpringBeanUtils.getBean(RpcServer.class);
    private RpcServerEventBroker serverEventBroker  = SpringBeanUtils.getBean(RpcServerEventBroker.class);

    @SneakyThrows
    @Override
    public void grpcRemoteInvoke(GrpcRequest request, StreamObserver<GrpcResponse> responseObserver) {
        String serviceId = request.getServiceId();
        LinkedHashMap<String, Object> params = new LinkedHashMap<>(request.getParamsMap());
        String version = request.getVersion();

        //get gsp context from filter
        var context = RpcThreadCacheHolder.getValue(ConstanceVarible.GSP_CONTEXT);
        Map<String,String> gspContext = (HashMap<String, String>) SerializerFactory.getDeserializer(SerializeType.Json).deserialize(context, HashMap.class);

        if (!gspContext.containsKey(ConstanceVarible.GSP_RPC)
            || gspContext.get(ConstanceVarible.GSP_RPC) == null
            || gspContext.get(ConstanceVarible.GSP_RPC).length() == 0
            || !"true".equalsIgnoreCase(gspContext.get(ConstanceVarible.GSP_RPC))){
            responseObserver.onError(Status.PERMISSION_DENIED.withDescription("Only allow invoke in iGIX rpc framework").asException());
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.illegalRPCCall,
                    new String[]{},
                    null, ExceptionLevel.Error, false);
        }


        //var contextId = gspContext.get("ContextId");
        String serviceUnitName = gspContext.get(ConstanceVarible.GSP_MSU);
        String tenantIdString = gspContext.get(ConstanceVarible.GSP_RPC_TENANT);
        Integer tenantId = !StringUtils.isEmpty(tenantIdString) ? Integer.valueOf(tenantIdString) : null;

        gspContext.remove(ConstanceVarible.GSP_RPC);
        gspContext.remove(ConstanceVarible.GSP_CONTEXT_ID);
        gspContext.remove(ConstanceVarible.GSP_MSU);


        var eventContext = gspContext.get(ConstanceVarible.GSP_RPC_CLIENT_ENVENT);
        var eventContextDict = SerializerFactory.getDeserializer(SerializeType.Json).deserialize(eventContext, HashMap.class);
        var localDict = new HashMap<String, Object>();
        localDict.put(ConstanceVarible.LOG_MSU, serviceUnitName);
        this.serverEventBroker.firePreRpcInvokeEvent(eventContextDict, params, localDict);

        //prepare tenant and msu
        setTenantAndMSU(tenantId,serviceUnitName);

        String result;
        try {
            var intermanager = SpringBeanUtils.getBean(InternalServiceManageService.class);
            var rpcServiceMethodDefinition = intermanager.getRpcMethodDefinition(serviceId);
            var parameters = RpcSerializeUtil.deSerializeParameter4RPC2(rpcServiceMethodDefinition,params);
//            var parameters = RpcSerializeUtil.deSerializeParameter(rpcServiceMethodDefinition,params);
            result = rpcServer.invokeService(serviceId, "", parameters, GspSerializeType.Protobuf).toString();
        } catch (Exception e) {
            log.error(e.getMessage(),e);
            responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asException());
            serverEventBroker.fireExceptionRpcInvokeEvent(eventContextDict, params, localDict, e);
            throw e;
        } finally {
            restoreTenant(tenantId);
        }


        var contextDict = new HashMap<String, String>();
        this.serverEventBroker.firePostRpcInvokeEvent(contextDict, result, localDict);
        var eventContextStr = SerializerFactory.getSerializer(SerializeType.Json).serializeToString(contextDict);
        RpcThreadCacheHolder.setValue(ConstanceVarible.GSP_RPC_SERVER_ENVENT, eventContextStr);

        GrpcResponse resp = GrpcResponse.newBuilder()
                .setMessage(result)
                .build();
        responseObserver.onNext(resp);
        responseObserver.onCompleted();
    }

    /**
     * 设置租户
     *
     * @param tenantId
     */
    private void setTenantAndMSU(Integer tenantId,String serviceUnit) {
        //设置租户和su信息
        cafContextService.setCurrentSU(serviceUnit);

        if (tenantId != null) {
            RequestTenantContextHolder.set(new RequestTenantContextInfo(tenantId));
        }
    }

    /**
     * 恢复租户上下文
     *
     * @param tenantId
     */
    private void restoreTenant(Integer tenantId) {
        if (tenantId != null) {
            RequestTenantContextHolder.restore();
        }
    }
}
